package com.geccocrawler.gecco;

import com.alibaba.fastjson.JSON;
import com.geccocrawler.gecco.assertion.AssertionFactory;
import com.geccocrawler.gecco.downloader.proxy.FileProxys;
import com.geccocrawler.gecco.downloader.proxy.Proxys;
import com.geccocrawler.gecco.dynamic.DynamicGecco;
import com.geccocrawler.gecco.dynamic.GeccoClassLoader;
import com.geccocrawler.gecco.exhand.DownLoadExhandFactory;
import com.geccocrawler.gecco.listener.EventListener;
import com.geccocrawler.gecco.monitor.GeccoJmx;
import com.geccocrawler.gecco.monitor.GeccoMonitor;
import com.geccocrawler.gecco.pipeline.PipelineFactory;
import com.geccocrawler.gecco.request.HttpGetRequest;
import com.geccocrawler.gecco.request.HttpRequest;
import com.geccocrawler.gecco.request.StartRequestList;
import com.geccocrawler.gecco.scheduler.NoLoopStartScheduler;
import com.geccocrawler.gecco.scheduler.Scheduler;
import com.geccocrawler.gecco.scheduler.StartScheduler;
import com.geccocrawler.gecco.spider.Spider;
import com.geccocrawler.gecco.spider.SpiderBeanFactory;
import com.google.common.collect.Lists;
import com.google.common.io.Files;
import com.google.common.io.Resources;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.IOException;
import java.net.URL;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;

/**
 * 爬虫引擎，每个爬虫引擎最好独立进程，在分布式爬虫场景下，可以单独分配一台爬虫服务器。引擎包括Scheduler、Downloader、Spider、 SpiderBeanFactory4个主要模块
 *
 * @author huchengyi
 */
public class GeccoEngine extends Thread {

    private static Logger log = LoggerFactory.getLogger(GeccoEngine.class);

    private String engineId;

    private Date startTime;

    private List<HttpRequest> startRequests = Lists.newArrayList();

    private List<HttpRequest> loopRequest = Lists.newArrayList();

    private Scheduler scheduler;

    private SpiderBeanFactory spiderBeanFactory;

    private PipelineFactory pipelineFactory;

    private AssertionFactory assertionFactory;

    private DownLoadExhandFactory downLoadExhandFactory;

    private List<Spider> spiders;

    private String classpath;

    private int threadCount;

    private CountDownLatch cdl;

    private int interval;

    private int intervalBase = 0;

    private Proxys proxysLoader;

    private boolean proxy = true;

    private boolean loop;

    private boolean mobile;

    private int retry;

    private EventListener eventListener;

    private boolean clickInCurrentThread;

    private boolean allWait = false;

    private GeccoEngine() {
        this.retry = 3;
    }

    /**
     * 动态配置规则不能使用该方法构造GeccoEngine
     *
     * @return GeccoEngine
     */
    public static GeccoEngine create() {
        GeccoEngine geccoEngine = new GeccoEngine();
        geccoEngine.setName("GeccoEngine");
        return geccoEngine;
    }

    public static GeccoEngine create(String classpath) {
        return create(classpath, null);
    }

    public static GeccoEngine create(String classpath, PipelineFactory pipelineFactory) {
        return create(classpath, pipelineFactory, null);
    }

    public static GeccoEngine create(String classpath, PipelineFactory pipelineFactory, AssertionFactory assertionFactory) {
        return create(classpath, pipelineFactory, assertionFactory, null);
    }

    public static GeccoEngine create(String classpath, PipelineFactory pipelineFactory, AssertionFactory assertionFactory, DownLoadExhandFactory downLoadExhandFactory) {
        if (StringUtils.isEmpty(classpath)) // classpath不为空
            throw new IllegalArgumentException("classpath cannot be empty");
        GeccoEngine ge = create();
        ge.spiderBeanFactory = new SpiderBeanFactory(classpath, pipelineFactory, assertionFactory, downLoadExhandFactory);
        return ge;
    }

    public GeccoEngine start(String url) {
        String[] urls = url.split(",");
        for (String str : urls)
            start(new HttpGetRequest(str));
        return this;
    }

    public GeccoEngine start(String... urls) {
        for (String url : urls)
            start(url);
        return this;
    }

    public GeccoEngine start(HttpRequest request) {
        //解决url的后缀问题
        String url = request.getUrl();
        request.setUrl(url.trim().endsWith("/") ? url.trim().substring(0, url.length() - 1) : url.trim());
        this.startRequests.add(request);
        return this;
    }

    public GeccoEngine start(List<HttpRequest> requests) {
        for (HttpRequest request : requests)
            start(request);
        return this;
    }

    public GeccoEngine scheduler(Scheduler scheduler) {
        this.scheduler = scheduler;
        return this;
    }

    public GeccoEngine thread(int count) {
        this.threadCount = count;
        return this;
    }

    public GeccoEngine engineId(String id) {
        this.engineId = id;
        return this;
    }

    public GeccoEngine interval(int interval) {
        this.interval = interval;
        return this;
    }

    public GeccoEngine intervalBase(int intervalBase) {
        this.intervalBase = intervalBase;
        return this;
    }

    public GeccoEngine retry(int retry) {
        this.retry = retry;
        return this;
    }

    public GeccoEngine loop(boolean loop) {
        this.loop = loop;
        return this;
    }

    public GeccoEngine clickInCurrentThread(boolean clickInCurrentThread) {
        this.clickInCurrentThread = clickInCurrentThread;
        return this;
    }

    public GeccoEngine proxysLoader(Proxys proxysLoader) {
        this.proxysLoader = proxysLoader;
        return this;
    }

    public GeccoEngine proxy(boolean proxy) {
        this.proxy = proxy;
        return this;
    }

    public GeccoEngine mobile(boolean mobile) {
        this.mobile = mobile;
        return this;
    }


    public GeccoEngine classpath(String classpath) {
        this.classpath = classpath;
        return this;
    }

    public GeccoEngine pipelineFactory(PipelineFactory pipelineFactory) {
        this.pipelineFactory = pipelineFactory;
        return this;
    }

    public GeccoEngine assertionFactory(AssertionFactory assertionFactory) {
        this.assertionFactory = assertionFactory;
        return this;
    }

    public GeccoEngine spiderBeanFactory(SpiderBeanFactory spiderBeanFactory) {
        this.spiderBeanFactory = spiderBeanFactory;
        return this;
    }

    public GeccoEngine downLoadExhandFactory(DownLoadExhandFactory downLoadExhandFactory) {
        this.downLoadExhandFactory = downLoadExhandFactory;
        return this;
    }

    public void register(Class<?> spiderBeanClass) {
        getSpiderBeanFactory().addSpiderBean(spiderBeanClass);
    }

    public void unregister(Class<?> spiderBeanClass) {
        getSpiderBeanFactory().removeSpiderBean(spiderBeanClass);
        DynamicGecco.unregister(spiderBeanClass);
    }

    @Override
    public void run() {
        if (proxysLoader == null) {//默认采用proxys文件代理集合
            proxysLoader = new FileProxys();
        }
        if (scheduler == null) {
            if (loop) {
                scheduler = new StartScheduler();
            } else {
                scheduler = new NoLoopStartScheduler();
            }
        }
        if (spiderBeanFactory == null) {
            if (StringUtils.isEmpty(classpath)) {
                // classpath不为空
                throw new IllegalArgumentException("classpath cannot be empty");
            }
            spiderBeanFactory = new SpiderBeanFactory(classpath, pipelineFactory, assertionFactory, downLoadExhandFactory);
        }
        if (threadCount <= 0) {
            threadCount = 1;
        }
        this.cdl = new CountDownLatch(threadCount);
        startsJson();

        for (HttpRequest startRequest : startRequests) {
            scheduler.into(startRequest);
            loopRequest.add(startRequest);
        }
        spiders = new ArrayList<>(threadCount);
        List<Thread> threads = Lists.newArrayList();
        for (int i = 0; i < threadCount; i++) {
            Spider spider = new Spider(this);
            spiders.add(spider);
            threads.add(new Thread(spider, "T" + classpath + i));
        }
        for (int i = 0; i < threadCount; i++)
            threads.get(i).start();
        startTime = new Date();
        // 监控爬虫基本信息
        GeccoMonitor.monitor(this);
        // 启动导出jmx信息
        GeccoJmx.export(classpath);
        // 非循环模式等待线程执行完毕后关闭
        closeUnitlComplete();
    }

    @Override
    public synchronized void start() {
        if (StringUtils.isEmpty(engineId)) {
            engineId = UUID.randomUUID().toString().replaceAll("-", "");
            log.info("engineId is null ,create the id is {}", engineId);
        }
        if (eventListener != null) {
            eventListener.onStart(this);
        }
        super.start();
    }

    private GeccoEngine startsJson() {
        try {
            URL url = Resources.getResource("starts.json");
            File file = new File(url.getPath());
            if (file.exists()) {
                String json = Files.toString(file, Charset.forName("UTF-8"));
                List<StartRequestList> list = JSON.parseArray(json, StartRequestList.class);
                for (StartRequestList start : list) {
                    start(start.toRequest());
                }
            }
        } catch (IllegalArgumentException ex) {
            log.info("starts.json not found" + ex.getMessage());
        } catch (IOException ioex) {
            log.error("", ioex);
        }
        return this;
    }

    public Scheduler getScheduler() {
        return scheduler;
    }

    public SpiderBeanFactory getSpiderBeanFactory() {
        return spiderBeanFactory;
    }

    public int getIntervalBase() {
        return intervalBase;
    }

    public int getInterval() {
        return interval;
    }

    public Date getStartTime() {
        return startTime;
    }

    public List<HttpRequest> getStartRequests() {
        return startRequests;
    }

    public List<Spider> getSpiders() {
        return spiders;
    }

    public int getRetry() {
        return retry;
    }

    public int getThreadCount() {
        return threadCount;
    }

    public boolean isLoop() {
        return loop;
    }

    public Proxys getProxysLoader() {
        return proxysLoader;
    }

    public boolean isMobile() {
        return mobile;
    }

    public boolean isProxy() {
        return proxy;
    }

    /**
     * spider线程告知engine执行结束
     */
    public void notifyComplete() {
        this.cdl.countDown();
    }

    /**
     * 非循环模式等待线程执行完毕后关闭
     */
    public void closeUnitlComplete() {
        if (!loop) {
            try {
                cdl.await();
            } catch (InterruptedException e) {
                log.error("", e);
            }
            if (spiderBeanFactory != null) {
                spiderBeanFactory.getDownloaderFactory().closeAll();
            }
            GeccoJmx.unexport();
            log.info("close gecco!");
        }

        if (eventListener != null) {
            eventListener.onStop(this);
        }
    }

    /**
     * 启动引擎，并返回GeccoEngine对象
     *
     * @return GeccoEngine
     */
    public GeccoEngine engineStart() {
        start();
        return this;
    }

    /**
     * 暂停
     */
    public void pause() {
        if (spiders != null) {
            for (Spider spider : spiders) {
                spider.pause();
            }
        }
        if (eventListener != null) {
            eventListener.onPause(this);
        }
    }

    /**
     * 重新开始抓取
     */
    public void restart() {
        if (spiders != null) {
            for (Spider spider : spiders) {
                spider.restart();
            }
        }
        if (eventListener != null) {
            eventListener.onRestart(this);
        }
    }

    public void beginUpdateRule() {
        if (log.isDebugEnabled()) {
            log.debug("begin update rule");
        }
        // 修改规则前需要暂停引擎并且重新创建ClassLoader
        pause();
        GeccoClassLoader.create();
    }

    public void endUpdateRule() {
        // 修改完成后重启引擎
        restart();
        if (log.isDebugEnabled()) {
            log.debug("end update rule");
        }
    }

    public void engineStop() {
        if (spiders != null) {
            for (Spider spider : spiders) {
                spider.stop();
            }
        }
        if (eventListener != null) {
            eventListener.onStop(this);
        }
    }

    public EventListener getEventListener() {
        return eventListener;
    }

    public GeccoEngine setEventListener(EventListener eventListener) {
        this.eventListener = eventListener;
        return this;
    }

    public boolean isClickInCurrentThread() {
        return clickInCurrentThread;
    }

    public boolean isAllWait() {
        if (allWait)
            return true;
        else {
            boolean flag = true;
            for (Spider spider : spiders)
                if (flag)
                    flag = spider.isEnd() || spider.isWaiting();
            if (isLoop() && flag) {
                for (HttpRequest request : loopRequest)
                    scheduler.into(request);
                allWait = false;
            } else
                allWait = flag;
        }
        return allWait;
    }

    public String getEngineId() {
        return engineId;
    }
}
